/*
 * QuorumPeer.hh
 *
 *  Created on: 2017-11-22
 *      Author: cxxjava@163.com
 */

#ifndef QuorumPeer_HH_
#define QuorumPeer_HH_

#include "Efc.hh"
#include "ELog.hh"

#include "./Follower.hh"
#include "./Leader.hh"
#include "./Observer.hh"

#include "./QuorumMaj.hh"
#include "./Vote.hh"
#include "./Election.hh"
#include "./QuorumCnxManager.hh"
#include "./QuorumAuthServer.hh"
#include "./QuorumServer.hh"
#include "./QuorumStats.hh"
#include "./QuorumAuth.hh"
#include "./NullQuorumAuthServer.hh"
#include "./NullQuorumAuthLearner.hh"
#include "./ReadOnlyZooKeeperServer.hh"
#include "../ServerStats.hh"
#include "../ServerCnxnFactory.hh"
#include "../util/ZxidUtils.hh"
#include "../persistence/FileTxnSnapLog.hh"
#include "../../common/AtomicFileOutputStream.hh"

namespace efc {
namespace ezk {

/**
 * This class manages the quorum protocol. There are three states this server
 * can be in:
 * <ol>
 * <li>Leader election - each server will elect a leader (proposing itself as a
 * leader initially).</li>
 * <li>Follower - the server will synchronize with the leader and replicate any
 * transactions.</li>
 * <li>Leader - the server will process requests and forward them to followers.
 * A majority of followers must log the request before it can be accepted.
 * </ol>
 *
 * This class will setup a datagram socket that will always respond with its
 * view of the current leader. The response will take the form of:
 *
 * <pre>
 * int xid;
 *
 * long myid;
 *
 * long leader_id;
 *
 * long leader_zxid;
 * </pre>
 *
 * The request for the current leader will consist solely of an xid: int xid;
 */

class QuorumPeer : public ZooKeeperThread , virtual public QuorumStats::Provider {
private:
	static sp<ELogger> LOG;// = LoggerFactory.getLogger(QuorumPeer.class);

public:
	/**
	 * @deprecated As of release 3.4.0, this class has been deprecated, since
	 * it is used with one of the udp-based versions of leader election, which
	 * we are also deprecating.
	 *
	 * This class simply responds to requests for the current leader of this
	 * node.
	 * <p>
	 * The request contains just an xid generated by the requestor.
	 * <p>
	 * The response has the xid, the id of this server, the id of the leader,
	 * and the zxid of the leader.
	 *
	 *
	 */
	class ResponderThread : public ZooKeeperThread {
    private:
        QuorumPeer* peer;
	public:
		volatile boolean running;// = true;

		ResponderThread(QuorumPeer* p) : ZooKeeperThread("ResponderThread"), peer(p) {
			running = true;
		}

		virtual void run();
	};

    /*
     * Default value of peer is participant
     */
	QuorumServer::LearnerType learnerType;// = LearnerType.PARTICIPANT;


    /**
     * QuorumVerifier implementation; default (majority).
     */
	sp<QuorumVerifier> quorumConfig;

    /**
     * My id
     */
	llong myid;


    /**
     * This is who I think the leader currently is.
     */
    sp<Vote> currentVote;// volatile private Vote currentVote;

    /**
     * ... and its counterpart for backward compatibility
     */
    sp<Vote> bcVote;// volatile private Vote bcVote;

    volatile boolean running;// = true;

    ServerState state;// = ServerState.LOOKING;

    sp<EInetSocketAddress> myQuorumAddr;

	int electionType;

	sp<FileTxnSnapLog> logFactory;// = null;

	sp<QuorumStats> quorumStats_;

    sp<EDatagramSocket> udpSocket;

    sp<Election> electionAlg;

    sp<ServerCnxnFactory> cnxnFactory;

    sp<ResponderThread> responder;

    llong acceptedEpoch;// = -1;
    llong currentEpoch;// = -1;

    sp<QuorumCnxManager> qcm;
    sp<QuorumAuthServer> authServer;
    sp<QuorumAuthLearner> authLearner;
    // VisibleForTesting. This flag is used to know whether qLearner's and
    // qServer's login context has been initialized as ApacheDS has concurrency
    // issues. Refer https://issues.apache.org/jira/browse/ZOOKEEPER-2712
    boolean authInitialized;// = false;

    /* ZKDatabase is a top level member of quorumpeer 
     * which will be used in all the zookeeperservers
     * instantiated later. Also, it is created once on 
     * bootup and only thrown away in case of a truncate
     * message from the leader
     */
    sp<ZKDatabase> zkDb;

    llong readLongFromFile(EString name) THROWS(EIOException) {
    	/* @see:
    	File file = new File(logFactory.getSnapDir(), name);
		BufferedReader br = new BufferedReader(new FileReader(file));
		String line = "";
		try {
			line = br.readLine();
    		return Long.parseLong(line);
    	} catch(ENumberFormatException& e) {
    		throw EIOException("Found " + line + " in " + file);
    	} finally {
    		br.close();
    	}
    	*/
		EFile file(logFactory->getSnapDir(), name.c_str());
    	ERandomAccessFile raf(&file, "r");
    	sp<EString> line = raf.readLine();
    	try {
    		return ELLong::parseLLong((line != null) ? line->c_str() : "");
    	} catch(ENumberFormatException& e) {
			throw EIOException(__FILE__, __LINE__, ("Found " + line->toString() + " in " + file.toString()).c_str());
    	}
    }

	/**
	 * Write a long value to disk atomically. Either succeeds or an exception
	 * is thrown.
	 * @param name file name to write the long to
	 * @param value the long value to write to the named file
	 * @throws IOException if the file cannot be written atomically
	 */
    void writeLongToFile(EString name, llong value) THROWS(EIOException) {
        EFile file (logFactory->getSnapDir(), name.c_str());
        AtomicFileOutputStream out(&file);
        EBufferedOutputStream bos(&out);
        boolean aborted = false;
        ON_FINALLY_NOTHROW(
			if (!aborted) {
				// if the close operation (rename) fails we'll get notified.
				// worst case the tmp file may still exist
				out.close();
			}
        ) {
			try {
				bos.write(ELLong::toString(value).c_str());
				bos.flush();
				out.flush();
			} catch (EIOException& e) {
				LOG->error("Failed to write new file " + file.toString(), e);
				// worst case here the tmp file/resources(fd) are not cleaned up
				//   and the caller will be notified (IOException)
				aborted = true;
				out.abort();
				throw e;
			}
        }}
    }

	void loadDataBase() {
		EFile updating(logFactory->getSnapDir(), UPDATING_EPOCH_FILENAME);
		try {
			zkDb->loadDataBase();

			// load the epochs
			llong lastProcessedZxid = zkDb->getDataTree()->lastProcessedZxid;
			llong epochOfZxid = ZxidUtils::getEpochFromZxid(lastProcessedZxid);
			try {
				currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
				if (epochOfZxid > currentEpoch && updating.exists()) {
					LOG->info(
							EString(UPDATING_EPOCH_FILENAME)
									+ " found. The server was terminated after "
											"taking a snapshot but before updating current "
											"epoch. Setting current epoch to "
									+ epochOfZxid);
					setCurrentEpoch(epochOfZxid);
					if (!updating.remove()) {
						throw EIOException(__FILE__, __LINE__,
								("Failed to delete " + updating.toString()).c_str());
					}
				}
			} catch (EFileNotFoundException& e) {
				// pick a reasonable epoch number
				// this should only happen once when moving to a
				// new code version
				currentEpoch = epochOfZxid;
				LOG->info(
						EString(CURRENT_EPOCH_FILENAME)
								+ " not found! Creating with a reasonable default of "
								+ currentEpoch
								+ ". This should only happen when you are upgrading your installation");
				writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
			}
			if (epochOfZxid > currentEpoch) {
				throw EIOException(__FILE__, __LINE__,
						("The current epoch, "
								+ ZxidUtils::zxidToString(currentEpoch)
								+ ", is older than the last zxid, "
								+ lastProcessedZxid).c_str());
			}
			try {
				acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
			} catch (EFileNotFoundException& e) {
				// pick a reasonable epoch number
				// this should only happen once when moving to a
				// new code version
				acceptedEpoch = epochOfZxid;
				LOG->info(__FILE__, __LINE__,
						EString(ACCEPTED_EPOCH_FILENAME)
								+ " not found! Creating with a reasonable default of "
								+ acceptedEpoch
								+ ". This should only happen when you are upgrading your installation");
				writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
			}
			if (acceptedEpoch < currentEpoch) {
				throw EIOException(__FILE__, __LINE__,
						("The accepted epoch, "
								+ ZxidUtils::zxidToString(acceptedEpoch)
								+ " is less than the current epoch, "
								+ ZxidUtils::zxidToString(currentEpoch)).c_str());
			}
		} catch (EIOException& ie) {
			LOG->error("Unable to load database on disk", ie);
			throw ERuntimeException(__FILE__, __LINE__, "Unable to run quorum server ", &ie);
		}
	}

    /**
     * Count the number of nodes in the map that could be followers.
     * @param peers
     * @return The number of followers in the map
     */
    static int countParticipants(EHashMap<llong, QuorumServer*>* peers) {
      int count = 0;
      auto iter = peers->values()->iterator();
      while (iter->hasNext()) {
    	  QuorumServer* q = iter->next();
          if (q->type == QuorumServer::LearnerType::PARTICIPANT) {
              count++;
          }
      }
      return count;
    }

//protected:
public:

    /**
     * The number of milliseconds of each tick
     */
    int tickTime;

    /**
     * Minimum number of milliseconds to allow for session timeout.
     * A value of -1 indicates unset, use default.
     */
    int minSessionTimeout;// = -1;

    /**
     * Maximum number of milliseconds to allow for session timeout.
     * A value of -1 indicates unset, use default.
     */
    int maxSessionTimeout;// = -1;

    /**
     * The number of ticks that the initial synchronization phase can take
     */
    int initLimit;

    /**
     * The number of ticks that can pass between sending a request and getting
     * an acknowledgment
     */
    int syncLimit;
    
    /**
     * Enables/Disables sync request processor. This option is enabled
     * by default and is to be used with observers.
     */
    boolean syncEnabled;// = true;

    /**
     * The current tick
     */
    EAtomicInteger tick;// = new AtomicInteger();

    /**
     * Whether or not to listen on all IPs for the two quorum ports
     * (broadcast and fast leader election).
     */
    boolean quorumListenOnAllIPs;// = false;

    /**
     * Kerberos quorum service principal. Defaulting to 'zkquorum/localhost'.
     */
    EString quorumServicePrincipal;

    /**
     * Quorum learner login context name in jaas-conf file to read the kerberos
     * security details. Defaulting to 'QuorumLearner'.
     */
    EString quorumLearnerLoginContext;

    /**
     * Quorum server login context name in jaas-conf file to read the kerberos
     * security details. Defaulting to 'QuorumServer'.
     */
    EString quorumServerLoginContext;

    // TODO: need to tune the default value of thread size
    static const int QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE = 20;
    /**
     * The maximum number of threads to allow in the connectionExecutors thread
     * pool which will be used to initiate quorum server connections.
     */
    int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;

    /**
     * Keeps time taken for leader election in milliseconds. Sets the value to
     * this variable only after the completion of leader election.
     */
    llong electionTimeTaken;// = -1;

public:
    sp<Follower> follower;
    sp<Leader> leader;
    sp<Observer> observer;

    /*
     * To enable observers to have no identifier, we need a generic identifier
     * at least for QuorumCnxManager. We use the following constant to as the
     * value of such a generic identifier.
     */

    static const llong OBSERVER_ID = ELLong::MAX_VALUE;

    /**
     * The syncEnabled can also be set via a system property.
     */
    constexpr static const char* SYNC_ENABLED = "zookeeper.observer.syncEnabled";

    constexpr static const char* CURRENT_EPOCH_FILENAME = "currentEpoch";

    constexpr static const char* ACCEPTED_EPOCH_FILENAME = "acceptedEpoch";

    constexpr static const char* UPDATING_EPOCH_FILENAME = "updatingEpoch";

    /*
     * Record leader election time
     */
    llong start_fle, end_fle;

    EHashMap<llong, QuorumServer*>* quorumPeers;

    QuorumServer::LearnerType getLearnerType() {
        return learnerType;
    }

    /**
     * Sets the LearnerType both in the QuorumPeer and in the peerMap
     */
    void setLearnerType(QuorumServer::LearnerType p) {
        learnerType = p;
        if (quorumPeers->containsKey(this->myid)) {
            this->quorumPeers->get(myid)->type = p;
        } else {
            LOG->error(EString("Setting LearnerType to ") + p + " but " + myid
                    + " not in QuorumPeers. ");
        }

    }
    /**
     * The servers that make up the cluster
     */
    int getQuorumSize(){
        return getVotingView()->size();
    }

    /**
     * get the id of this quorum peer.
     */
    llong getId() {
        return myid;
    }

    synchronized
    sp<Vote> getCurrentVote(){
    	SYNCHRONIZED(this) {
    		return currentVote;
    	}}
    }

    synchronized
    void setCurrentVote(sp<Vote> v) {
    	SYNCHRONIZED(this) {
    		currentVote = v;
    	}}
    }

    synchronized
    sp<Vote> getBCVote() {
    	SYNCHRONIZED(this) {
			if (bcVote == null) {
				return currentVote;
			} else {
				return bcVote;
			}
    	}}
    }

    synchronized
    void setBCVote(sp<Vote> v) {
    	SYNCHRONIZED(this) {
    		bcVote = v;
    	}}
    }

    synchronized
    void setPeerState(ServerState newState){
    	SYNCHRONIZED(this) {
    		state=newState;
    	}}
    }

    const char* getPeerStateName() {
    	return getStateName(getPeerState());
    }

    synchronized
    ServerState getPeerState(){
    	SYNCHRONIZED(this) {
    		return state;
    	}}
    }

    sp<EInetSocketAddress> getQuorumAddress(){
        return myQuorumAddr;
    }

public:
    QuorumPeer() : ZooKeeperThread("QuorumPeer"),
        learnerType(QuorumServer::LearnerType::PARTICIPANT),
    	running(true),
    	state(ServerState::LOOKING),
    	acceptedEpoch(-1),
    	currentEpoch(-1),
    	authInitialized(false),
    	minSessionTimeout(-1),
    	maxSessionTimeout(-1),
    	syncEnabled(true),
    	quorumListenOnAllIPs(false),
    	electionTimeTaken(-1),
    	start_fle(0),
    	end_fle(0)
    {
    	quorumStats_ = new QuorumStats(this);
        initialize();
    }
    
   
    /**
     * For backward compatibility purposes, we instantiate QuorumMaj by default.
     */
    
    QuorumPeer(EHashMap<llong, QuorumServer*>* quorumPeers, EFile* dataDir,
            EFile* dataLogDir, int electionType,
            llong myid, int tickTime, int initLimit, int syncLimit,
            sp<ServerCnxnFactory> cnxnFactory) THROWS(EIOException) :
            ZooKeeperThread("QuorumPeer"),
            learnerType(QuorumServer::LearnerType::PARTICIPANT),
			running(true),
			state(ServerState::LOOKING),
			acceptedEpoch(-1),
			currentEpoch(-1),
			authInitialized(false),
			minSessionTimeout(-1),
			maxSessionTimeout(-1),
			syncEnabled(true),
			quorumListenOnAllIPs(false),
			electionTimeTaken(-1),
			start_fle(0),
			end_fle(0) {
    	this->quorumStats_ = new QuorumStats(this);
		initialize();

    	this->cnxnFactory = cnxnFactory;
		this->quorumPeers = quorumPeers;
		this->electionType = electionType;
		this->myid = myid;
		this->tickTime = tickTime;
		this->initLimit = initLimit;
		this->syncLimit = syncLimit;
		this->quorumListenOnAllIPs = false;
		this->logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
		this->zkDb = new ZKDatabase(this->logFactory);
		this->quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
    }
    
    QuorumPeer(EHashMap<llong, QuorumServer*>* quorumPeers, EFile* dataDir,
    		EFile* dataLogDir, int electionType,
            llong myid, int tickTime, int initLimit, int syncLimit,
            boolean quorumListenOnAllIPs,
            sp<ServerCnxnFactory> cnxnFactory,
            sp<QuorumVerifier> quorumConfig) THROWS(EIOException) :
            ZooKeeperThread("QuorumPeer"),
            learnerType(QuorumServer::LearnerType::PARTICIPANT),
        	running(true),
        	state(ServerState::LOOKING),
        	acceptedEpoch(-1),
        	currentEpoch(-1),
        	authInitialized(false),
        	minSessionTimeout(-1),
        	maxSessionTimeout(-1),
        	syncEnabled(true),
        	quorumListenOnAllIPs(false),
        	electionTimeTaken(-1),
        	start_fle(0),
        	end_fle(0) {
    	this->quorumStats_ = new QuorumStats(this);
		initialize();

        this->cnxnFactory = cnxnFactory;
        this->quorumPeers = quorumPeers;
        this->electionType = electionType;
        this->myid = myid;
        this->tickTime = tickTime;
        this->initLimit = initLimit;
        this->syncLimit = syncLimit;
        this->quorumListenOnAllIPs = quorumListenOnAllIPs;
        this->logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
        this->zkDb = new ZKDatabase(this->logFactory);
        if(quorumConfig == null)
        	this->quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
        else this->quorumConfig = quorumConfig;
    }

    void initialize() {
        // init quorum auth server & learner
		authServer = new NullQuorumAuthServer();
		authLearner = new NullQuorumAuthLearner();
    }

    sp<QuorumStats> quorumStats() {
        return quorumStats_;
    }
    
    synchronized
    virtual void start() {
    	SYNCHRONIZED(this) {
			loadDataBase();
			cnxnFactory->start();
			startLeaderElection();
			ZooKeeperThread::start();
    	}}
    }

    synchronized
    void stopLeaderElection() {
    	SYNCHRONIZED(this) {
			responder->running = false;
			responder->interrupt();
    	}}
    }

    synchronized
    void startLeaderElection() {
    	SYNCHRONIZED(this) {
			try {
				currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
			} catch(EIOException& e) {
				ERuntimeException re(e.getSourceFile(), e.getSourceLine(), e.getMessage());
				re.initCause(&e);
				throw re;
			}
			auto iter = getView()->values()->iterator();
			while (iter->hasNext()) {
				QuorumServer* p = iter->next();
				if (p->id == myid) {
					myQuorumAddr = p->addr;
					break;
				}
			}
			if (myQuorumAddr == null) {
				throw ERuntimeException(__FILE__, __LINE__, (EString("My id ") + myid + " not in the peer list").c_str());
			}
			if (electionType == 0) {
				try {
					udpSocket = new EDatagramSocket(myQuorumAddr->getPort());
					responder = new ResponderThread(this);
					EThread::setDaemon(responder, true); //!!!
					responder->start();
				} catch (ESocketException& e) {
					throw ERuntimeException(e.getSourceFile(), e.getSourceLine(), &e);
				}
			}
			this->electionAlg = createElectionAlgorithm(electionType);
    	}}
	}
    
    /**
     * returns the highest zxid that this host has seen
     * 
     * @return the highest zxid for this host
     */
    llong getLastLoggedZxid() {
        if (!zkDb->isInitialized()) {
        	loadDataBase();
        }
        return zkDb->getDataTreeLastProcessedZxid();
    }

protected:
    sp<Follower> makeFollower(sp<FileTxnSnapLog> logFactory) THROWS(EIOException);
     
    sp<Leader> makeLeader(sp<FileTxnSnapLog> logFactory) THROWS(EIOException);
    
    sp<Observer> makeObserver(sp<FileTxnSnapLog> logFactory) THROWS(EIOException);

    sp<Election> createElectionAlgorithm(int electionAlgorithm);

    sp<Election> makeLEStrategy();

    synchronized
    void setLeader(sp<Leader> newLeader){
    	SYNCHRONIZED(this) {
    		leader=newLeader;
    	}}
    }

    synchronized
    void setFollower(sp<Follower> newFollower){
    	SYNCHRONIZED(this) {
    		follower=newFollower;
    	}}
    }
    
    synchronized
    void setObserver(sp<Observer> newObserver){
    	SYNCHRONIZED(this) {
    		observer=newObserver;
    	}}
    }

public:
    synchronized sp<ZooKeeperServer> getActiveServer();

    virtual void run();

    virtual void shutdown();

    /**
     * A 'view' is a node's current opinion of the membership of the entire
     * ensemble.
     */
    EHashMap<llong, QuorumServer*>* getView() {
        //@see: return Collections.unmodifiableMap(this->quorumPeers);
    	return quorumPeers;
    }

    /**
     * Observers are not contained in this view, only nodes with 
     * PeerType=PARTICIPANT.
     */
    sp<EHashMap<llong,QuorumServer*> > getVotingView();

    static sp<EHashMap<llong,QuorumServer*> > viewToVotingView(
    		EHashMap<llong, QuorumServer*>* view);

    /**
     * Check if a node is in the current view. With static membership, the
     * result of this check will never change; only when dynamic membership
     * is introduced will this be more useful.
     */
    boolean viewContains(llong sid) {
        return quorumPeers->containsKey(sid);
    }
    
    /**
     * Only used by QuorumStats at the moment
     */
    EA<EString*> getQuorumPeers();

    EString getServerState() {
        switch (getPeerState()) {
        case LOOKING:
            return LOOKING_STATE;
        case LEADING:
            return LEADING_STATE;
        case FOLLOWING:
            return FOLLOWING_STATE;
        case OBSERVING:
            return OBSERVING_STATE;
        }
        return UNKNOWN_STATE;
    }


    /**
     * get the id of this quorum peer.
     */
    llong getMyid() {
        return myid;
    }

    /**
     * set the id of this quorum peer.
     */
    void setMyid(long myid) {
    	this->myid = myid;
    }

    /**
     * Get the number of milliseconds of each tick
     */
    int getTickTime() {
        return tickTime;
    }

    /**
     * Set the number of milliseconds of each tick
     */
    void setTickTime(int tickTime) {
        LOG->info(EString("tickTime set to ") + tickTime);
        this->tickTime = tickTime;
    }

    /** Maximum number of connections allowed from particular host (ip) */
    int getMaxClientCnxnsPerHost() {
    	sp<ServerCnxnFactory> fac = getCnxnFactory();
        if (fac == null) {
            return -1;
        }
        return fac->getMaxClientCnxnsPerHost();
    }
    
    /** minimum session timeout in milliseconds */
    int getMinSessionTimeout() {
        return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
    }

    /** minimum session timeout in milliseconds */
    void setMinSessionTimeout(int min) {
        LOG->info(EString("minSessionTimeout set to ") + min);
        this->minSessionTimeout = min;
    }

    /** maximum session timeout in milliseconds */
    int getMaxSessionTimeout() {
        return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
    }

    /** minimum session timeout in milliseconds */
    void setMaxSessionTimeout(int max) {
        LOG->info(EString("maxSessionTimeout set to ") + max);
        this->maxSessionTimeout = max;
    }

    /**
     * Get the number of ticks that the initial synchronization phase can take
     */
    int getInitLimit() {
        return initLimit;
    }

    /**
     * Set the number of ticks that the initial synchronization phase can take
     */
    void setInitLimit(int initLimit) {
        LOG->info(EString("initLimit set to ") + initLimit);
        this->initLimit = initLimit;
    }

    /**
     * Get the current tick
     */
    int getTick() {
        return tick.get();
    }
    
    /**
     * Return QuorumVerifier object
     */
    
    sp<QuorumVerifier> getQuorumVerifier(){
        return quorumConfig;
        
    }
    
    void setQuorumVerifier(sp<QuorumVerifier> quorumConfig){
    	this->quorumConfig = quorumConfig;
    }
    
    /**
     * Get an instance of LeaderElection
     */
        
    sp<Election> getElectionAlg(){
        return electionAlg;
    }
        
    /**
     * Get the synclimit
     */
    int getSyncLimit() {
        return syncLimit;
    }

    /**
     * Set the synclimit
     */
    void setSyncLimit(int syncLimit) {
    	this->syncLimit = syncLimit;
    }
    
    /**
     * Return syncEnabled.
     * 
     * @return
     */
    boolean getSyncEnabled() {
        const char* enabled = ESystem::getProperty(SYNC_ENABLED);
        if (enabled != null) {
            boolean b = EBoolean::parseBoolean(enabled);
            LOG->info(SYNC_ENABLED + EString("=") + b);
            return b;
        } else {        
            return syncEnabled;
        }
    }
    
    /**
     * Set syncEnabled.
     * 
     * @param syncEnabled
     */
    void setSyncEnabled(boolean syncEnabled) {
    	this->syncEnabled = syncEnabled;
    }

    /**
     * Gets the election type
     */
    int getElectionType() {
        return electionType;
    }

    /**
     * Sets the election type
     */
    void setElectionType(int electionType) {
    	this->electionType = electionType;
    }

    boolean getQuorumListenOnAllIPs() {
        return quorumListenOnAllIPs;
    }

    void setQuorumListenOnAllIPs(boolean quorumListenOnAllIPs) {
    	this->quorumListenOnAllIPs = quorumListenOnAllIPs;
    }

    sp<ServerCnxnFactory> getCnxnFactory() {
        return cnxnFactory;
    }

    void setCnxnFactory(sp<ServerCnxnFactory> cnxnFactory) {
    	this->cnxnFactory = cnxnFactory;
    }

    void setQuorumPeers(EHashMap<llong, QuorumServer*>* quorumPeers) {
    	this->quorumPeers = quorumPeers;
    }

    int getClientPort() {
        return cnxnFactory->getLocalPort();
    }

    void setClientPortAddress(sp<EInetSocketAddress> addr) {
    }
 
    void setTxnFactory(sp<FileTxnSnapLog> factory) {
    	this->logFactory = factory;
    }
    
    sp<FileTxnSnapLog> getTxnFactory() {
        return this->logFactory;
    }

    /**
     * set zk database for this node
     * @param database
     */
    void setZKDatabase(sp<ZKDatabase> database) {
    	this->zkDb = database;
    }

    sp<ZKDatabase> getZkDb() {
        return zkDb;
    }

    void setRunning(boolean running) {
    	this->running = running;
    }

    boolean isRunning() {
        return running;
    }

    /**
     * get reference to QuorumCnxManager
     */
    sp<QuorumCnxManager> getQuorumCnxManager() {
        return qcm;
    }

    llong getCurrentEpoch() THROWS(EIOException) {
		if (currentEpoch == -1) {
			currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
		}
		return currentEpoch;
	}
	
	llong getAcceptedEpoch() THROWS(EIOException) {
		if (acceptedEpoch == -1) {
			acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
		}
		return acceptedEpoch;
	}
	
	void setCurrentEpoch(llong e) THROWS(EIOException) {
		currentEpoch = e;
		writeLongToFile(CURRENT_EPOCH_FILENAME, e);
		
	}
	
	void setAcceptedEpoch(llong e) THROWS(EIOException) {
		acceptedEpoch = e;
		writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
	}

    /**
     * Updates leader election info to avoid inconsistencies when
     * a new server tries to join the ensemble.
     * See ZOOKEEPER-1732 for more info.
     */
    void updateElectionVote(llong newEpoch);

    void setQuorumServicePrincipal(EString servicePrincipal) {
        quorumServicePrincipal = servicePrincipal;
        LOG->info(EString::formatOf("%s set to %s",QuorumAuth::QUORUM_KERBEROS_SERVICE_PRINCIPAL,
                quorumServicePrincipal.c_str()));
    }

    void setQuorumLearnerLoginContext(EString learnerContext) {
        quorumLearnerLoginContext = learnerContext;
        LOG->info(EString::formatOf("%s set to %s", QuorumAuth::QUORUM_LEARNER_SASL_LOGIN_CONTEXT,
                quorumLearnerLoginContext.c_str()));
    }

    void setQuorumServerLoginContext(EString serverContext) {
        quorumServerLoginContext = serverContext;
        LOG->info(EString::formatOf("%s set to %s", QuorumAuth::QUORUM_SERVER_SASL_LOGIN_CONTEXT,
                quorumServerLoginContext.c_str()));
    }

    void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
        if (qCnxnThreadsSize > QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE) {
            quorumCnxnThreadsSize = qCnxnThreadsSize;
        }
        LOG->info(EString("quorum.cnxn.threads.size set to ") + quorumCnxnThreadsSize);
    }

    // VisibleForTesting. Returns true if both the quorumlearner and
    // quorumserver login has been finished. Otherwse, false.
    boolean hasAuthInitialized(){
        return authInitialized;
    }

    sp<QuorumCnxManager> createCnxnManager() {
        return new QuorumCnxManager(this->getId(),
        							this->getView(),
                                    this->authServer,
                                    this->authLearner,
                                    this->tickTime * this->syncLimit,
                                    this->getQuorumListenOnAllIPs(),
                                    this->quorumCnxnThreadsSize);
    }

    /**
     * Sets the time taken for leader election in milliseconds.
     *
     * @param electionTimeTaken
     *            time taken for leader election
     */
    void setElectionTimeTaken(long electionTimeTaken) {
    	this->electionTimeTaken = electionTimeTaken;
    }

    /**
     * @return the time taken for leader election in milliseconds.
     */
    llong getElectionTimeTaken() {
        return electionTimeTaken;
    }
};

} /* namespace ezk */
} /* namespace efc */
#endif /* QuorumPeer_HH_ */
